// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <brpc/controller.h>
#include <bvar/window.h>
#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <google/protobuf/repeated_field.h>
#include <gtest/gtest.h>

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <random>
#include <string>
#include <thread>

#include "common/config.h"
#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/mem_txn_kv.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
#include "mock_resource_manager.h"
#include "rate-limiter/rate_limiter.h"
#include "resource-manager/resource_manager.h"

namespace doris::cloud {

// External functions from meta_service_test.cpp
extern std::unique_ptr<MetaServiceProxy> get_meta_service();
extern std::unique_ptr<MetaServiceProxy> get_meta_service(bool mock_resource_mgr);
extern void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id,
                          int64_t partition_id, int64_t tablet_id);
extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id,
                                              int64_t version, int num_rows);
extern void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
                          CreateRowsetResponse& res);
extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label,
                          int64_t table_id, int64_t partition_id, int64_t tablet_id);
extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id,
                       int64_t partition_id, int64_t tablet_id);
extern void get_tablet_stats(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id,
                             int64_t partition_id, int64_t tablet_id, GetTabletStatsResponse& res);
extern void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t table_id,
                                     int64_t index_id, int64_t partition_id, int64_t tablet_id,
                                     int64_t txn_id);

void insert_compact_rowset(Transaction* txn, std::string instance_id, int64_t tablet_id,
                           int64_t partition_id, int64_t start_version, int64_t end_version,
                           int num_rows) {
    doris::RowsetMetaCloudPB compact_rowset =
            create_rowset(1, tablet_id, partition_id, start_version, num_rows);
    compact_rowset.set_end_version(end_version);
    std::string key = versioned::meta_rowset_compact_key({instance_id, tablet_id, end_version});
    ASSERT_TRUE(versioned::document_put(txn, key, std::move(compact_rowset)));
}

void update_tablet_compact_stats(Transaction* txn, std::string instance_id, int64_t tablet_id,
                                 int start_version, int end_version, int num_rows) {
    MetaReader reader(instance_id, nullptr);
    std::vector<RowsetMetaCloudPB> rowset_metas;
    ASSERT_EQ(reader.get_rowset_metas(txn, tablet_id, start_version, end_version, &rowset_metas),
              TxnErrorCode::TXN_OK);

    int input_data_size = 0;
    int input_num_rows = 0;
    int input_num_rowsets = 0;
    int input_num_segments = 0;
    int input_index_size = 0;
    int input_segment_size = 0;
    for (const auto& rowset_meta : rowset_metas) {
        input_data_size += rowset_meta.total_disk_size();
        input_num_rows += rowset_meta.num_rows();
        input_num_rowsets += 1; // Each rowset is considered as one rowset
        input_num_segments += rowset_meta.num_segments();
        input_index_size += rowset_meta.index_disk_size();
        input_segment_size += rowset_meta.data_disk_size();
    }

    std::string key = versioned::tablet_compact_stats_key({instance_id, tablet_id});
    TabletStatsPB stats;
    ASSERT_EQ(versioned::document_get(txn, key, &stats, nullptr), TxnErrorCode::TXN_OK);

    // See create_rowset() for the calculation of data size
    stats.set_data_size(stats.data_size() + num_rows * 110 - input_data_size);
    stats.set_num_rows(stats.num_rows() + num_rows - input_num_rows);
    stats.set_num_rowsets(stats.num_rowsets() + 1 - input_num_rowsets);
    stats.set_num_segments(stats.num_segments() + 1 - input_num_segments);
    stats.set_index_size(stats.index_size() + num_rows * 10 - input_index_size);
    stats.set_segment_size(stats.segment_size() + num_rows * 100 - input_segment_size);
    ASSERT_TRUE(versioned::document_put(txn, key, std::move(stats)));
}

void compact_rowset(TxnKv* txn_kv, std::string instance_id, int64_t tablet_id, int64_t partition_id,
                    int64_t start_version, int64_t end_version, int num_rows) {
    std::unique_ptr<Transaction> txn;
    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
    insert_compact_rowset(txn.get(), instance_id, tablet_id, partition_id, start_version,
                          end_version, num_rows);
    update_tablet_compact_stats(txn.get(), instance_id, tablet_id, start_version, end_version,
                                num_rows);
    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}

// Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
static void create_and_refresh_instance(MetaServiceProxy* service, std::string instance_id) {
    // write instance
    InstanceInfoPB instance_info;
    instance_info.set_instance_id(instance_id);
    instance_info.set_multi_version_status(MULTI_VERSION_READ_WRITE);
    std::unique_ptr<Transaction> txn;
    ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
    txn->put(instance_key(instance_id), instance_info.SerializeAsString());
    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

    service->resource_mgr()->refresh_instance(instance_id);
    ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id));
}

#define MOCK_GET_INSTANCE_ID(instance_id)                                          \
    DORIS_CLOUD_DEFER {                                                            \
        SyncPoint::get_instance()->clear_all_call_backs();                         \
    };                                                                             \
    SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&& args) { \
        auto* ret = try_any_cast_ret<std::string>(args);                           \
        ret->first = instance_id;                                                  \
        ret->second = true;                                                        \
    });                                                                            \
    SyncPoint::get_instance()->enable_processing();

TEST(MetaServiceVersionedReadTest, CommitTxn) {
    auto meta_service = get_meta_service(false);
    std::string instance_id = "test_cloud_instance_id";
    std::string cloud_unique_id = "1:test_cloud_unique_id:1";
    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(meta_service.get(), instance_id);

    int64_t db_id = 666;
    int64_t table_id = 1234;
    int64_t index_id = 1235;
    int64_t partition_id = 1236;

    // case: first version of rowset
    {
        int64_t txn_id = -1;
        // begin txn
        {
            brpc::Controller cntl;
            BeginTxnRequest req;
            req.set_cloud_unique_id(cloud_unique_id);
            TxnInfoPB txn_info_pb;
            txn_info_pb.set_db_id(db_id);
            txn_info_pb.set_label("test_label");
            txn_info_pb.add_table_ids(table_id);
            txn_info_pb.set_timeout_ms(36000);
            req.mutable_txn_info()->CopyFrom(txn_info_pb);
            BeginTxnResponse res;
            meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                    &req, &res, nullptr);
            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
            txn_id = res.txn_id();
        }

        // mock rowset and tablet
        int64_t tablet_id_base = 1103;
        for (int i = 0; i < 5; ++i) {
            create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i);
            auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id, -1, 100);
            CreateRowsetResponse res;
            commit_rowset(meta_service.get(), tmp_rowset, res);
            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        }

        // precommit txn
        {
            brpc::Controller cntl;
            PrecommitTxnRequest req;
            req.set_cloud_unique_id(cloud_unique_id);
            req.set_db_id(db_id);
            req.set_txn_id(txn_id);
            req.set_precommit_timeout_ms(36000);
            PrecommitTxnResponse res;
            meta_service->precommit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                        &req, &res, nullptr);
            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        }

        // commit txn
        {
            brpc::Controller cntl;
            CommitTxnRequest req;
            req.set_cloud_unique_id(cloud_unique_id);
            req.set_db_id(db_id);
            req.set_txn_id(txn_id);
            CommitTxnResponse res;
            meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                     &req, &res, nullptr);
            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        }

        // doubly commit txn
        {
            brpc::Controller cntl;
            CommitTxnRequest req;
            req.set_cloud_unique_id(cloud_unique_id);
            req.set_db_id(db_id);
            req.set_txn_id(txn_id);
            CommitTxnResponse res;
            meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                     &req, &res, nullptr);
            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
            auto found = res.status().msg().find(fmt::format(
                    "transaction is already visible: db_id={} txn_id={}", db_id, txn_id));
            ASSERT_TRUE(found != std::string::npos);
        }

        // doubly commit txn(2pc)
        {
            brpc::Controller cntl;
            CommitTxnRequest req;
            req.set_cloud_unique_id("test_cloud_unique_id");
            req.set_db_id(db_id);
            req.set_txn_id(txn_id);
            req.set_is_2pc(true);
            CommitTxnResponse res;
            meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                     &req, &res, nullptr);
            ASSERT_EQ(res.status().code(), MetaServiceCode::TXN_ALREADY_VISIBLE);
            auto found = res.status().msg().find(
                    fmt::format("transaction [{}] is already visible, not pre-committed.", txn_id));
            ASSERT_TRUE(found != std::string::npos);
        }
    }

    {
        // Get the partition versions
        brpc::Controller cntl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.set_partition_id(partition_id);
        GetVersionResponse res;
        meta_service->get_version(&cntl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        ASSERT_EQ(res.version(), 2);
    }
}

TEST(MetaServiceVersionedReadTest, CommitTxnWithSubTxnTest) {
    auto meta_service = get_meta_service(false);
    const std::string instance_id = "test_cloud_instance_id";
    const std::string cloud_unique_id = "1:test_cloud_unique_id:1";
    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(meta_service.get(), instance_id);

    int64_t db_id = 98131;
    int64_t txn_id = -1;
    int64_t t1 = 10;
    int64_t t1_index = 100;
    int64_t t1_p1 = 11;
    int64_t t1_p1_t1 = 12;
    int64_t t1_p1_t2 = 13;
    int64_t t1_p2 = 14;
    int64_t t1_p2_t1 = 15;
    int64_t t2 = 16;
    int64_t t2_index = 101;
    int64_t t2_p3 = 17;
    int64_t t2_p3_t1 = 18;
    [[maybe_unused]] int64_t t2_p4 = 19;
    [[maybe_unused]] int64_t t2_p4_t1 = 20;
    std::string label = "test_label";
    std::string label2 = "test_label_0";

    // begin txn
    {
        brpc::Controller cntl;
        BeginTxnRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        TxnInfoPB txn_info_pb;
        txn_info_pb.set_db_id(db_id);
        txn_info_pb.set_label(label);
        txn_info_pb.add_table_ids(t1);
        txn_info_pb.set_timeout_ms(36000);
        req.mutable_txn_info()->CopyFrom(txn_info_pb);
        BeginTxnResponse res;
        meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
                                &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        txn_id = res.txn_id();
    }

    // mock rowset and tablet: for sub_txn1
    int64_t sub_txn_id1 = txn_id;
    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1, sub_txn_id1);
    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2, sub_txn_id1);
    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p2, t1_p2_t1, sub_txn_id1);

    // begin_sub_txn2
    int64_t sub_txn_id2 = -1;
    {
        brpc::Controller cntl;
        BeginSubTxnRequest req;
        req.set_cloud_unique_id("test_cloud_unique_id");
        req.set_txn_id(txn_id);
        req.set_sub_txn_num(0);
        req.set_db_id(db_id);
        req.set_label(label2);
        req.mutable_table_ids()->Add(t1);
        req.mutable_table_ids()->Add(t2);
        BeginSubTxnResponse res;
        meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                    &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        ASSERT_EQ(res.txn_info().table_ids().size(), 2);
        ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
        ASSERT_TRUE(res.has_sub_txn_id());
        sub_txn_id2 = res.sub_txn_id();
        ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
    }
    // mock rowset and tablet: for sub_txn3
    create_and_commit_rowset(meta_service.get(), t2, t2_index, t2_p3, t2_p3_t1, sub_txn_id2);

    // begin_sub_txn3
    int64_t sub_txn_id3 = -1;
    {
        brpc::Controller cntl;
        BeginSubTxnRequest req;
        req.set_cloud_unique_id("test_cloud_unique_id");
        req.set_txn_id(txn_id);
        req.set_sub_txn_num(1);
        req.set_db_id(db_id);
        req.set_label("test_label_1");
        req.mutable_table_ids()->Add(t1);
        req.mutable_table_ids()->Add(t2);
        req.mutable_table_ids()->Add(t1);
        BeginSubTxnResponse res;
        meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
                                    &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        ASSERT_EQ(res.txn_info().table_ids().size(), 3);
        ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
        ASSERT_TRUE(res.has_sub_txn_id());
        sub_txn_id3 = res.sub_txn_id();
        ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
    }
    // mock rowset and tablet: for sub_txn3
    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1, sub_txn_id3);
    create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2, sub_txn_id3);

    // commit txn
    CommitTxnRequest req;
    {
        brpc::Controller cntl;
        req.set_cloud_unique_id("test_cloud_unique_id");
        req.set_db_id(666);
        req.set_txn_id(txn_id);
        req.set_is_txn_load(true);

        SubTxnInfo sub_txn_info1;
        sub_txn_info1.set_sub_txn_id(sub_txn_id1);
        sub_txn_info1.set_table_id(t1);
        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t1);
        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t2);
        sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p2_t1);

        SubTxnInfo sub_txn_info2;
        sub_txn_info2.set_sub_txn_id(sub_txn_id2);
        sub_txn_info2.set_table_id(t2);
        sub_txn_info2.mutable_base_tablet_ids()->Add(t2_p3_t1);

        SubTxnInfo sub_txn_info3;
        sub_txn_info3.set_sub_txn_id(sub_txn_id3);
        sub_txn_info3.set_table_id(t1);
        sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t1);
        sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t2);

        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1));
        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2));
        req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info3));
        CommitTxnResponse res;
        meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
                                 &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        // std::cout << res.DebugString() << std::endl;
        ASSERT_EQ(res.table_ids().size(), 3);
        for (int i = 0; i < 3; ++i) {
            if (res.partition_ids(i) == t2_p3) {
                ASSERT_EQ(res.table_ids(i), t2);
                ASSERT_EQ(res.versions(i), 2);
            } else if (res.partition_ids(i) == t1_p2) {
                ASSERT_EQ(res.table_ids(i), t1);
                ASSERT_EQ(res.versions(i), 2);
            } else if (res.partition_ids(i) == t1_p1) {
                ASSERT_EQ(res.table_ids(i), t1);
                ASSERT_EQ(res.versions(i), 3);
            } else {
                ASSERT_TRUE(false) << "unknown partition: " << res.partition_ids(i)
                                   << ", res=" << res.DebugString();
            }
        }
    }

    // doubly commit txn
    {
        brpc::Controller cntl;
        CommitTxnResponse res;
        meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
                                 &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        auto found = res.status().msg().find(
                fmt::format("transaction is already visible: db_id={} txn_id={}", db_id, txn_id));
        ASSERT_TRUE(found != std::string::npos);
    }

    // Verify the partition versions
    {
        brpc::Controller ctrl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_batch_mode(true);
        req.add_db_ids(db_id);
        req.add_db_ids(db_id);
        req.add_db_ids(db_id);
        req.add_table_ids(t1);
        req.add_table_ids(t1);
        req.add_table_ids(t2);
        req.add_partition_ids(t1_p1);
        req.add_partition_ids(t1_p2);
        req.add_partition_ids(t2_p3);

        GetVersionResponse resp;
        meta_service->get_version(&ctrl, &req, &resp, nullptr);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
                << " status is " << resp.status().ShortDebugString();
        ASSERT_EQ(resp.versions().size(), 3);
        ASSERT_EQ(resp.versions()[0], 3); // t1_p1
        ASSERT_EQ(resp.versions()[1], 2); // t1_p2
        ASSERT_EQ(resp.versions()[2], 2); // t2_p3
    }
}

TEST(MetaServiceVersionedReadTest, GetVersion) {
    auto service = get_meta_service(false);

    int64_t table_id = 1;
    int64_t partition_id = 1;
    int64_t tablet_id = 1;

    std::string instance_id = "test_cloud_instance_id";
    std::string cloud_unique_id = fmt::format("1:{}:1", instance_id);

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(service.get(), instance_id);

    // INVALID_ARGUMENT
    {
        brpc::Controller ctrl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_table_id(table_id);
        req.set_partition_id(partition_id);

        GetVersionResponse resp;
        service->get_version(&ctrl, &req, &resp, nullptr);

        ASSERT_EQ(resp.status().code(), MetaServiceCode::INVALID_ARGUMENT)
                << " status is " << resp.status().DebugString();
    }

    {
        brpc::Controller ctrl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_db_id(1);
        req.set_table_id(table_id);
        req.set_partition_id(partition_id);

        GetVersionResponse resp;
        service->get_version(&ctrl, &req, &resp, nullptr);

        ASSERT_EQ(resp.status().code(), MetaServiceCode::VERSION_NOT_FOUND)
                << " status is " << resp.status().DebugString();
    }

    {
        brpc::Controller ctrl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_db_id(1);
        req.set_table_id(table_id);
        req.set_partition_id(partition_id);
        req.set_is_table_version(true);

        GetVersionResponse resp;
        service->get_version(&ctrl, &req, &resp, nullptr);

        ASSERT_EQ(resp.status().code(), MetaServiceCode::VERSION_NOT_FOUND)
                << " status is " << resp.status().DebugString();
    }

    create_tablet(service.get(), table_id, 1, partition_id, tablet_id);
    insert_rowset(service.get(), 1, "get_version_label_1", table_id, partition_id, tablet_id);

    {
        brpc::Controller ctrl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_db_id(1);
        req.set_table_id(table_id);
        req.set_partition_id(partition_id);

        GetVersionResponse resp;
        service->get_version(&ctrl, &req, &resp, nullptr);

        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
                << " status is " << resp.status().DebugString();
        ASSERT_EQ(resp.version(), 2);
    }

    {
        brpc::Controller ctrl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_db_id(1);
        req.set_table_id(table_id);
        req.set_partition_id(partition_id);
        req.set_is_table_version(true);

        GetVersionResponse resp;
        service->get_version(&ctrl, &req, &resp, nullptr);

        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
                << " status is " << resp.status().DebugString();
        ASSERT_GT(resp.version(), 2);
    }
}

TEST(MetaServiceVersionedReadTest, BatchGetVersion) {
    struct TestCase {
        std::vector<int64_t> table_ids;
        std::vector<int64_t> partition_ids;
        std::vector<int64_t> expected_versions;
        std::vector<
                std::tuple<int64_t /*table_id*/, int64_t /*partition_id*/, int64_t /*tablet_id*/>>
                insert_rowsets;
    };

    // table ids: 2, 3, 4, 5
    // partition ids: 6, 7, 8, 9
    std::vector<TestCase> cases = {
            // all version are missing
            {{1, 2, 3, 4}, {6, 7, 8, 9}, {-1, -1, -1, -1}, {}},
            // update table 1, partition 6
            {{1, 2, 3, 4}, {6, 7, 8, 9}, {2, -1, -1, -1}, {{1, 6, 1}}},
            // update table 2, partition 6
            // update table 3, partition 7
            {{1, 2, 3, 4}, {6, 7, 8, 9}, {2, -1, 2, 2}, {{3, 8, 3}, {4, 9, 4}}},
            // update table 1, partition 7 twice
            {{1, 2, 3, 4}, {6, 7, 8, 9}, {2, 3, 2, 2}, {{2, 7, 2}, {2, 7, 2}}},
    };

    auto service = get_meta_service(false);

    std::string instance_id = "test_cloud_instance_id";
    std::string cloud_unique_id = fmt::format("1:{}:1", instance_id);

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(service.get(), instance_id);

    create_tablet(service.get(), 1, 1, 6, 1);
    create_tablet(service.get(), 2, 1, 7, 2);
    create_tablet(service.get(), 3, 1, 8, 3);
    create_tablet(service.get(), 4, 1, 9, 4);

    size_t num_cases = cases.size();
    size_t label_index = 0;
    for (size_t i = 0; i < num_cases; ++i) {
        auto& [table_ids, partition_ids, expected_versions, insert_rowsets] = cases[i];
        for (auto [table_id, partition_id, tablet_id] : insert_rowsets) {
            LOG(INFO) << "insert rowset for table " << table_id << " partition " << partition_id
                      << " table_id " << tablet_id;
            insert_rowset(service.get(), 1, std::to_string(++label_index), table_id, partition_id,
                          tablet_id);
        }

        brpc::Controller ctrl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_db_id(-1);
        req.set_table_id(-1);
        req.set_partition_id(-1);
        req.set_batch_mode(true);
        for (size_t i = 0; i < table_ids.size(); ++i) req.add_db_ids(1);
        std::copy(table_ids.begin(), table_ids.end(),
                  google::protobuf::RepeatedFieldBackInserter(req.mutable_table_ids()));
        std::copy(partition_ids.begin(), partition_ids.end(),
                  google::protobuf::RepeatedFieldBackInserter(req.mutable_partition_ids()));

        GetVersionResponse resp;
        service->get_version(&ctrl, &req, &resp, nullptr);

        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
                << "case " << i << " status is " << resp.status().msg()
                << ", code=" << resp.status().code();

        std::vector<int64_t> versions(resp.versions().begin(), resp.versions().end());
        EXPECT_EQ(versions, expected_versions) << "case " << i;

        // Batch get table versions
        req.set_is_table_version(true);

        resp.Clear();
        service->get_version(&ctrl, &req, &resp, nullptr);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
                << "case " << i << " status is " << resp.status().msg()
                << ", code=" << resp.status().code();
        for (size_t j = 0; j < resp.versions_size(); ++j) {
            if (expected_versions[j] == -1) {
                ASSERT_LT(resp.versions(j), 0)
                        << "case " << i << ", j=" << j << ", resp=" << resp.DebugString();
            } else {
                ASSERT_GT(resp.versions(j), 0)
                        << "case " << i << ", j=" << j << ", resp=" << resp.DebugString();
            }
        }
    }

    // INVALID_ARGUMENT
    {
        brpc::Controller ctrl;
        GetVersionRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_batch_mode(true);
        GetVersionResponse resp;
        service->get_version(&ctrl, &req, &resp, nullptr);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::INVALID_ARGUMENT)
                << " status is " << resp.status().msg() << ", code=" << resp.status().code();
    }
}

TEST(MetaServiceVersionedReadTest, BatchGetVersionFallback) {
    constexpr size_t N = 100;
    size_t i = 0;
    auto sp = SyncPoint::get_instance();
    DORIS_CLOUD_DEFER {
        SyncPoint::get_instance()->clear_all_call_backs();
    };
    sp->set_call_back("batch_get_version_err", [&](auto&& args) {
        if (i++ == N / 10) {
            *try_any_cast<TxnErrorCode*>(args) = TxnErrorCode::TXN_TOO_OLD;
        }
    });

    sp->enable_processing();

    auto service = get_meta_service(false);
    std::string instance_id = "test_cloud_instance_id";
    std::string cloud_unique_id = fmt::format("1:{}:1", instance_id);

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(service.get(), instance_id);

    for (int64_t i = 1; i <= N; ++i) {
        create_tablet(service.get(), 1, 1, i, i);
        insert_rowset(service.get(), 1, std::to_string(i), 1, i, i);
    }

    brpc::Controller ctrl;
    GetVersionRequest req;
    req.set_cloud_unique_id(cloud_unique_id);
    req.set_db_id(-1);
    req.set_table_id(-1);
    req.set_partition_id(-1);
    req.set_batch_mode(true);
    for (size_t i = 1; i <= N; ++i) {
        req.add_db_ids(1);
        req.add_table_ids(1);
        req.add_partition_ids(i);
    }

    GetVersionResponse resp;
    service->get_version(&ctrl, &req, &resp, nullptr);

    ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
            << "case " << i << " status is " << resp.status().msg()
            << ", code=" << resp.status().code();

    ASSERT_EQ(resp.versions_size(), N);
}

TEST(MetaServiceVersionedReadTest, GetTablet) {
    auto service = get_meta_service(false);
    std::string instance_id = "test_cloud_instance_id";

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(service.get(), instance_id);

    int64_t table_id = 2;
    int64_t index_id = 3;
    int64_t partition_id = 4;
    int64_t tablet_id = 5;

    // Create tablet
    create_tablet(service.get(), table_id, index_id, partition_id, tablet_id);

    {
        // Get the tablet meta
        brpc::Controller cntl;
        GetTabletRequest req;
        req.set_cloud_unique_id(fmt::format("1:{}:1", instance_id));
        req.set_tablet_id(tablet_id);
        GetTabletResponse resp;
        service->get_tablet(&cntl, &req, &resp, nullptr);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
                << " status is " << resp.status().DebugString();
        ASSERT_EQ(resp.tablet_meta().table_id(), table_id);
        ASSERT_EQ(resp.tablet_meta().index_id(), index_id);
        ASSERT_EQ(resp.tablet_meta().partition_id(), partition_id);
        ASSERT_EQ(resp.tablet_meta().tablet_id(), tablet_id);

        // Verify the tablet schema
        ASSERT_TRUE(resp.tablet_meta().has_schema());
    }
}

TEST(MetaServiceVersionedReadTest, GetTabletStats) {
    auto meta_service = get_meta_service(false);

    std::string instance_id = "test_cloud_instance_id";

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(meta_service.get(), instance_id);

    constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004;
    ASSERT_NO_FATAL_FAILURE(
            create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id));

    {
        GetTabletStatsResponse res;
        get_tablet_stats(meta_service.get(), table_id, index_id, partition_id, tablet_id, res);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        ASSERT_EQ(res.tablet_stats_size(), 1);
        EXPECT_EQ(res.tablet_stats(0).data_size(), 0);
        EXPECT_EQ(res.tablet_stats(0).num_rows(), 0);
        EXPECT_EQ(res.tablet_stats(0).num_rowsets(), 1);
        EXPECT_EQ(res.tablet_stats(0).num_segments(), 0);
        EXPECT_EQ(res.tablet_stats(0).index_size(), 0);
        EXPECT_EQ(res.tablet_stats(0).segment_size(), 0);

        // The tablet idx should be set.
        auto tablet_stats = res.tablet_stats(0);
        EXPECT_EQ(tablet_stats.idx().tablet_id(), tablet_id);
        EXPECT_EQ(tablet_stats.idx().index_id(), index_id);
        EXPECT_EQ(tablet_stats.idx().partition_id(), partition_id);
        EXPECT_EQ(tablet_stats.idx().table_id(), table_id);
    }

    {
        // Insert rowset
        config::split_tablet_stats = false;
        ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label1", table_id,
                                              partition_id, tablet_id));
        ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label2", table_id,
                                              partition_id, tablet_id));
        config::split_tablet_stats = true;
        ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label3", table_id,
                                              partition_id, tablet_id));
        ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label4", table_id,
                                              partition_id, tablet_id));
        ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label5", table_id,
                                              partition_id, tablet_id));
        ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label6", table_id,
                                              partition_id, tablet_id));
        ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label7", table_id,
                                              partition_id, tablet_id));
    }

    {
        GetTabletStatsResponse res;
        get_tablet_stats(meta_service.get(), table_id, index_id, partition_id, tablet_id, res);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        ASSERT_EQ(res.tablet_stats_size(), 1);
        EXPECT_EQ(res.tablet_stats(0).data_size(), 77000);
        EXPECT_EQ(res.tablet_stats(0).num_rows(), 700);
        EXPECT_EQ(res.tablet_stats(0).num_rowsets(), 8);
        EXPECT_EQ(res.tablet_stats(0).num_segments(), 7);
        EXPECT_EQ(res.tablet_stats(0).index_size(), 7000);
        EXPECT_EQ(res.tablet_stats(0).segment_size(), 70000);
    }

    {
        // Compact some rowsets, the rows is not changed
        auto txn_kv = meta_service->txn_kv();
        compact_rowset(txn_kv.get(), instance_id, tablet_id, partition_id, 2, 4, 300);
        compact_rowset(txn_kv.get(), instance_id, tablet_id, partition_id, 6, 7, 200);
    }

    {
        // rowset.set_num_segments(1);
        // rowset.set_num_rows(num_rows);
        // rowset.set_data_disk_size(num_rows * 100);
        // rowset.set_index_disk_size(num_rows * 10);
        // rowset.set_total_disk_size(num_rows * 110);
        GetTabletStatsResponse res;
        get_tablet_stats(meta_service.get(), table_id, index_id, partition_id, tablet_id, res);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
        ASSERT_EQ(res.tablet_stats_size(), 1);
        EXPECT_EQ(res.tablet_stats(0).data_size(), 77000);
        EXPECT_EQ(res.tablet_stats(0).num_rows(), 700);
        EXPECT_EQ(res.tablet_stats(0).num_rowsets(), 5); // 0-1, 2-4, 5, 6-7, 8
        EXPECT_EQ(res.tablet_stats(0).num_segments(), 4);
        EXPECT_EQ(res.tablet_stats(0).index_size(), 7000);
        EXPECT_EQ(res.tablet_stats(0).segment_size(), 70000);
    }
}

TEST(MetaServiceVersionedReadTest, GetRowsetMetas) {
    auto service = get_meta_service(false);
    std::string instance_id = "test_cloud_instance_id";

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(service.get(), instance_id);

    int64_t db_id = 1;
    int64_t table_id = 2;
    int64_t index_id = 3;
    int64_t partition_id = 4;
    int64_t tablet_id = 5;

    // Create tablet
    create_tablet(service.get(), table_id, index_id, partition_id, tablet_id);

    // Helper function to get rowsets using MetaService get_rowset interface
    auto get_rowsets = [&](int64_t start_version, int64_t end_version) -> GetRowsetResponse {
        brpc::Controller cntl;
        GetRowsetRequest req;
        auto* tablet_idx = req.mutable_idx();
        tablet_idx->set_db_id(db_id);
        tablet_idx->set_table_id(table_id);
        tablet_idx->set_index_id(index_id);
        tablet_idx->set_partition_id(partition_id);
        tablet_idx->set_tablet_id(tablet_id);
        req.set_start_version(start_version);
        req.set_end_version(end_version);
        req.set_cumulative_compaction_cnt(0);
        req.set_base_compaction_cnt(0);
        req.set_cumulative_point(2);

        GetRowsetResponse resp;
        service->get_rowset(&cntl, &req, &resp, nullptr);
        return resp;
    };

    // Test 1: Contains the first rowset
    {
        auto resp = get_rowsets(1, 10);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) << resp.status().DebugString();
        ASSERT_EQ(resp.rowset_meta_size(), 1);
        EXPECT_TRUE(resp.has_partition_max_version());
        EXPECT_EQ(resp.partition_max_version(), -1); // The first version is special
    }

    // Step 1: Insert loading rowsets with versions 2, 3, 4, 5, 6, 7, 8, 9, 10
    LOG(INFO) << "Inserting loading rowsets for versions 2-10";
    for (int64_t version = 2; version <= 10; ++version) {
        insert_rowset(service.get(), 1, fmt::format("load_label_{}", version), table_id,
                      partition_id, tablet_id);
    }

    // Test 2: Get all loading rowsets
    {
        auto resp = get_rowsets(1, 10);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) << resp.status().DebugString();
        ASSERT_EQ(resp.rowset_meta_size(), 10);
        EXPECT_TRUE(resp.has_partition_max_version());
        EXPECT_EQ(resp.partition_max_version(), 10);

        // Verify rowsets are sorted by version
        for (int i = 0; i < resp.rowset_meta_size(); ++i) {
            const auto& meta = resp.rowset_meta(i);
            ASSERT_EQ(meta.end_version(), i + 1) << meta.DebugString();
        }
    }

    // Step 2: Insert compact rowsets [3-5] and [9-10]
    LOG(INFO) << "Inserting compact rowsets [3-5] and [9-10]";

    // Create compact rowset [3-4]
    {
        std::unique_ptr<Transaction> txn;
        ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
        insert_compact_rowset(txn.get(), instance_id, tablet_id, partition_id, 3, 4, 300);
        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
    }

    // Create compact rowset [3-5]
    {
        std::unique_ptr<Transaction> txn;
        ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
        insert_compact_rowset(txn.get(), instance_id, tablet_id, partition_id, 3, 5, 300);
        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
    }

    // Create compact rowset [9-10]
    {
        std::unique_ptr<Transaction> txn;
        ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
        insert_compact_rowset(txn.get(), instance_id, tablet_id, partition_id, 9, 10, 190);
        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
    }

    // Test 3: Verify compact rowsets override load rowsets
    {
        auto resp = get_rowsets(1, 10);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);

        // 1, 2, [3-5], 6, 7, 8, [9-10]
        ASSERT_EQ(resp.rowset_meta_size(), 7) << resp.ShortDebugString();

        // Expected sequence: v1, v2, [3-5], v6, v7, v8, [9-10]
        std::vector<std::pair<int64_t, int64_t>> expected_versions = {
                {0, 1}, {2, 2}, {3, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 10}};

        for (int i = 0; i < resp.rowset_meta_size(); ++i) {
            const auto& meta = resp.rowset_meta(i);
            ASSERT_EQ(meta.start_version(), expected_versions[i].first)
                    << "Rowset " << i << " start_version mismatch";
            ASSERT_EQ(meta.end_version(), expected_versions[i].second)
                    << "Rowset " << i << " end_version mismatch";
        }
    }

    // Test 4: Boundary cases - query ranges that test boundaries
    struct TestCase {
        int64_t start_version;
        int64_t end_version;
        std::vector<std::pair<int64_t, int64_t>> expected_ranges;
        std::string description;
    };

    std::vector<TestCase> test_cases = {
            // Test exact version match
            {1, 1, {{0, 1}}, "Single version 1"},
            {2, 2, {{2, 2}}, "Single version 2"},

            // Test range before compact
            {1, 2, {{0, 1}, {2, 2}}, "Range [1-2] before compact"},

            // Test range ending at compact boundary
            {2, 5, {{2, 2}, {3, 5}}, "Range [2-5] ending at compact boundary"},

            // Test range starting at compact boundary
            {3, 6, {{3, 5}, {6, 6}}, "Range [3-6] starting at compact boundary"},

            // Test exact compact range
            {3, 5, {{3, 5}}, "Exact compact range [3-5]"},
            {9, 10, {{9, 10}}, "Exact compact range [9-10]"},

            // Test range spanning compact
            {2, 6, {{2, 2}, {3, 5}, {6, 6}}, "Range [2-6] spanning compact"},

            // Test range between compacts
            {6, 8, {{6, 6}, {7, 7}, {8, 8}}, "Range [6-8] between compacts"},

            // Test range crossing second compact
            {8, 10, {{8, 8}, {9, 10}}, "Range [8-10] crossing second compact"},

            // Test partial compact overlap
            {4, 6, {{3, 5}, {6, 6}}, "Range [4-6] partial compact overlap"},
            {9, 10, {{9, 10}}, "Range [9-10] contains last version"},

            // Test edge cases
            {5,
             10,
             {{3, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 10}},
             "Range [5-10] crossing both compacts"},
            {5, 9, {{3, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 9}}, "Range [5-9] includes first compact"},
            {3, 4, {{3, 4}}, "Range [3-4] within first compact"},
    };

    for (const auto& test_case : test_cases) {
        LOG(INFO) << "Testing: " << test_case.description;
        auto resp = get_rowsets(test_case.start_version, test_case.end_version);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK)
                << "Failed for " << test_case.description;

        ASSERT_EQ(resp.rowset_meta_size(), test_case.expected_ranges.size())
                << "Rowset count mismatch for " << test_case.description << ". Got "
                << resp.rowset_meta_size() << " rowsets, expected "
                << test_case.expected_ranges.size();

        for (int i = 0; i < resp.rowset_meta_size(); ++i) {
            const auto& meta = resp.rowset_meta(i);
            const auto& expected = test_case.expected_ranges[i];
            ASSERT_EQ(meta.start_version(), expected.first)
                    << "Start version mismatch for " << test_case.description << " at index " << i;
            ASSERT_EQ(meta.end_version(), expected.second)
                    << "End version mismatch for " << test_case.description << " at index " << i
                    << " \nRowsetMeta: " << meta.ShortDebugString()
                    << " \nResponse: " << resp.ShortDebugString();
        }
    }

    // Test 5: Test empty ranges
    {
        auto resp = get_rowsets(11, 15);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
        ASSERT_EQ(resp.rowset_meta_size(), 0) << "Should return empty for non-existent versions";
    }

    LOG(INFO) << "GetRowsetMetas test completed successfully";
}

TEST(MetaServiceVersionedReadTest, UpdateTablet) {
    auto service = get_meta_service(false);
    std::string instance_id = "test_cloud_instance_id";
    std::string cloud_unique_id = fmt::format("1:{}:1", instance_id);

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(service.get(), instance_id);

    constexpr auto table_id = 11231, index_id = 11232, partition_id = 11233, tablet_id1 = 11234,
                   tablet_id2 = 21234;
    ASSERT_NO_FATAL_FAILURE(
            create_tablet(service.get(), table_id, index_id, partition_id, tablet_id1));
    ASSERT_NO_FATAL_FAILURE(
            create_tablet(service.get(), table_id, index_id, partition_id, tablet_id2));
    auto get_and_check_tablet_meta = [&](int tablet_id, int64_t ttl_seconds, bool in_memory,
                                         bool is_persistent) {
        brpc::Controller cntl;
        GetTabletRequest req;
        req.set_cloud_unique_id(cloud_unique_id);
        req.set_tablet_id(tablet_id);
        GetTabletResponse resp;
        service->get_tablet(&cntl, &req, &resp, nullptr);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) << tablet_id;
        EXPECT_EQ(resp.tablet_meta().ttl_seconds(), ttl_seconds);
        EXPECT_EQ(resp.tablet_meta().is_in_memory(), in_memory);
        EXPECT_EQ(resp.tablet_meta().is_persistent(), is_persistent);
    };
    get_and_check_tablet_meta(tablet_id1, 0, false, false);
    get_and_check_tablet_meta(tablet_id2, 0, false, false);
    {
        brpc::Controller cntl;
        UpdateTabletRequest req;
        UpdateTabletResponse resp;
        req.set_cloud_unique_id(cloud_unique_id);
        TabletMetaInfoPB* tablet_meta_info = req.add_tablet_meta_infos();
        tablet_meta_info->set_tablet_id(tablet_id1);
        tablet_meta_info->set_ttl_seconds(300);
        tablet_meta_info = req.add_tablet_meta_infos();
        tablet_meta_info->set_tablet_id(tablet_id2);
        tablet_meta_info->set_ttl_seconds(3000);
        service->update_tablet(&cntl, &req, &resp, nullptr);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
    }
    get_and_check_tablet_meta(tablet_id1, 300, false, false);
    get_and_check_tablet_meta(tablet_id2, 3000, false, false);
    {
        brpc::Controller cntl;
        UpdateTabletRequest req;
        UpdateTabletResponse resp;
        req.set_cloud_unique_id(cloud_unique_id);
        TabletMetaInfoPB* tablet_meta_info = req.add_tablet_meta_infos();
        tablet_meta_info->set_tablet_id(tablet_id1);
        tablet_meta_info->set_is_in_memory(true);
        service->update_tablet(&cntl, &req, &resp, nullptr);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
    }
    {
        brpc::Controller cntl;
        UpdateTabletRequest req;
        UpdateTabletResponse resp;
        req.set_cloud_unique_id(cloud_unique_id);
        TabletMetaInfoPB* tablet_meta_info = req.add_tablet_meta_infos();
        tablet_meta_info->set_tablet_id(tablet_id1);
        tablet_meta_info->set_is_persistent(true);
        service->update_tablet(&cntl, &req, &resp, nullptr);
        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
    }
    get_and_check_tablet_meta(tablet_id1, 300, true, true);
}

TEST(MetaServiceVersionedReadTest, IndexRequest) {
    auto meta_service = get_meta_service(false);
    std::string instance_id = "commit_index";

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(meta_service.get(), instance_id);

    constexpr int64_t db_id = 123;
    constexpr int64_t table_id = 10001;
    constexpr int64_t index_id = 10002;

    {
        // Prepare index
        brpc::Controller ctrl;
        IndexRequest req;
        IndexResponse res;
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.add_index_ids(index_id);
        meta_service->prepare_index(&ctrl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString();
    }

    {
        // Commit index
        brpc::Controller ctrl;
        IndexRequest req;
        IndexResponse res;
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.add_index_ids(index_id);
        req.set_is_new_table(true);
        meta_service->commit_index(&ctrl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString();
    }

    {
        // Prepare index again
        brpc::Controller ctrl;
        IndexRequest req;
        IndexResponse res;
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.add_index_ids(index_id);
        meta_service->prepare_index(&ctrl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED)
                << res.status().DebugString();
    }

    {
        // Commit index again
        brpc::Controller ctrl;
        IndexRequest req;
        IndexResponse res;
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.add_index_ids(index_id);
        req.set_is_new_table(true);
        meta_service->commit_index(&ctrl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString();
    }
}

TEST(MetaServiceVersionedReadTest, PartitionRequest) {
    auto meta_service = get_meta_service(false);
    std::string instance_id = "PartitionRequest";

    MOCK_GET_INSTANCE_ID(instance_id);
    create_and_refresh_instance(meta_service.get(), instance_id);

    constexpr int64_t db_id = 1;
    constexpr int64_t table_id = 10001;
    constexpr int64_t index_id = 10002;
    constexpr int64_t partition_id = 10003;

    {
        // Prepare transaction
        brpc::Controller ctrl;
        PartitionRequest req;
        PartitionResponse res;
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.add_index_ids(index_id);
        req.add_partition_ids(partition_id);
        meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString();
    }

    {
        // Commit partition
        brpc::Controller ctrl;
        PartitionRequest req;
        PartitionResponse res;
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.add_index_ids(index_id);
        req.add_partition_ids(partition_id);
        meta_service->commit_partition(&ctrl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString();
    }

    {
        // Prepare partition again
        brpc::Controller ctrl;
        PartitionRequest req;
        PartitionResponse res;
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.add_index_ids(index_id);
        req.add_partition_ids(partition_id);
        meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED)
                << res.status().DebugString();
    }

    {
        // Commit partition again
        brpc::Controller ctrl;
        PartitionRequest req;
        PartitionResponse res;
        req.set_db_id(db_id);
        req.set_table_id(table_id);
        req.add_index_ids(index_id);
        req.add_partition_ids(partition_id);
        meta_service->commit_partition(&ctrl, &req, &res, nullptr);
        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString();
    }
}

} // namespace doris::cloud
